/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hudi.io.storage.row.parquet;

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;

import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;

/**
 * Schema converter converts Parquet schema to and from Flink internal types.
 *
 * <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter to support timestamp of INT64 8 bytes.
 */
public class ParquetSchemaConverter {
  private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class);
  public static final String MAP_VALUE = "value";
  public static final String LIST_ARRAY_TYPE = "array";
  public static final String LIST_ELEMENT = "element";
  public static final String LIST_GROUP_NAME = "list";
  public static final String MESSAGE_ROOT = "root";

  /**
   * Converts Parquet schema to Flink Internal Type.
   *
   * @param type Parquet schema
   * @return Flink type information
   */
  public static TypeInformation<?> fromParquetType(MessageType type) {
    return convertFields(type.getFields());
  }

  /**
   * Converts Flink Internal Type to Parquet schema.
   *
   * @param typeInformation Flink type information
   * @param legacyMode      is standard LIST and MAP schema or back-compatible schema
   * @return Parquet schema
   */
  public static MessageType toParquetType(
      TypeInformation<?> typeInformation, boolean legacyMode) {
    return (MessageType)
        convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode);
  }

  public static TypeInformation<?> convertFields(List<Type> parquetFields) {
    List<TypeInformation<?>> types = new ArrayList<>();
    List<String> names = new ArrayList<>();
    for (Type field : parquetFields) {
      TypeInformation<?> subType = convertParquetTypeToTypeInfo(field);
      if (subType != null) {
        types.add(subType);
        names.add(field.getName());
      } else {
        LOGGER.error(
            "Parquet field {} in schema type {} can not be converted to Flink Internal Type",
            field.getName(),
            field.getOriginalType().name());
      }
    }

    return new RowTypeInfo(
        types.toArray(new TypeInformation<?>[0]), names.toArray(new String[0]));
  }

  public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
    TypeInformation<?> typeInfo;
    if (fieldType.isPrimitive()) {
      OriginalType originalType = fieldType.getOriginalType();
      PrimitiveType primitiveType = fieldType.asPrimitiveType();
      switch (primitiveType.getPrimitiveTypeName()) {
        case BINARY:
          if (originalType != null) {
            switch (originalType) {
              case DECIMAL:
                typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
                break;
              case UTF8:
              case ENUM:
              case JSON:
              case BSON:
                typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type BINARY");
            }
          } else {
            typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
          }
          break;
        case BOOLEAN:
          typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
          break;
        case INT32:
          if (originalType != null) {
            switch (originalType) {
              case TIME_MICROS:
              case TIME_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIME;
                break;
              case TIMESTAMP_MICROS:
              case TIMESTAMP_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIMESTAMP;
                break;
              case DATE:
                typeInfo = SqlTimeTypeInfo.DATE;
                break;
              case UINT_8:
              case UINT_16:
              case UINT_32:
                typeInfo = BasicTypeInfo.INT_TYPE_INFO;
                break;
              case INT_8:
                typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
                break;
              case INT_16:
                typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
                break;
              case INT_32:
                typeInfo = BasicTypeInfo.INT_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type INT32");
            }
          } else {
            typeInfo = BasicTypeInfo.INT_TYPE_INFO;
          }
          break;
        case INT64:
          if (originalType != null) {
            switch (originalType) {
              case TIME_MICROS:
                typeInfo = SqlTimeTypeInfo.TIME;
                break;
              case TIMESTAMP_MICROS:
              case TIMESTAMP_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIMESTAMP;
                break;
              case INT_64:
              case DECIMAL:
                typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type INT64");
            }
          } else {
            typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
          }
          break;
        case INT96:
          // It stores a timestamp type data, we read it as millisecond
          typeInfo = SqlTimeTypeInfo.TIMESTAMP;
          break;
        case FLOAT:
          typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
          break;
        case DOUBLE:
          typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
          break;
        case FIXED_LEN_BYTE_ARRAY:
          if (originalType != null) {
            switch (originalType) {
              case DECIMAL:
                typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type FIXED_LEN_BYTE_ARRAY");
            }
          } else {
            typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
          }
          break;
        default:
          throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
      }
    } else {
      GroupType parquetGroupType = fieldType.asGroupType();
      OriginalType originalType = parquetGroupType.getOriginalType();
      if (originalType != null) {
        switch (originalType) {
          case LIST:
            if (parquetGroupType.getFieldCount() != 1) {
              throw new UnsupportedOperationException(
                  "Invalid list type " + parquetGroupType);
            }
            Type repeatedType = parquetGroupType.getType(0);
            if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
              throw new UnsupportedOperationException(
                  "Invalid list type " + parquetGroupType);
            }

            if (repeatedType.isPrimitive()) {
              typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
            } else {
              // Backward-compatibility element group name can be any string
              // (element/array/other)
              GroupType elementType = repeatedType.asGroupType();
              // If the repeated field is a group with multiple fields, then its type
              // is the element
              // type and elements are required.
              if (elementType.getFieldCount() > 1) {

                for (Type type : elementType.getFields()) {
                  if (!type.isRepetition(Type.Repetition.REQUIRED)) {
                    throw new UnsupportedOperationException(
                        String.format(
                            "List field [%s] in List [%s] has to be required. ",
                            type.toString(), fieldType.getName()));
                  }
                }
                typeInfo =
                    ObjectArrayTypeInfo.getInfoFor(
                        convertParquetTypeToTypeInfo(elementType));
              } else {
                Type internalType = elementType.getType(0);
                if (internalType.isPrimitive()) {
                  typeInfo =
                      convertParquetPrimitiveListToFlinkArray(internalType);
                } else {
                  // No need to do special process for group named array and tuple
                  GroupType tupleGroup = internalType.asGroupType();
                  if (tupleGroup.getFieldCount() == 1
                      && tupleGroup
                      .getFields()
                      .get(0)
                      .isRepetition(Type.Repetition.REQUIRED)) {
                    typeInfo =
                        ObjectArrayTypeInfo.getInfoFor(
                            convertParquetTypeToTypeInfo(internalType));
                  } else {
                    throw new UnsupportedOperationException(
                        String.format(
                            "Unrecgonized List schema [%s] according to Parquet"
                                + " standard",
                            parquetGroupType.toString()));
                  }
                }
              }
            }
            break;

          case MAP_KEY_VALUE:
          case MAP:
            // The outer-most level must be a group annotated with MAP
            // that contains a single field named key_value
            if (parquetGroupType.getFieldCount() != 1
                || parquetGroupType.getType(0).isPrimitive()) {
              throw new UnsupportedOperationException(
                  "Invalid map type " + parquetGroupType);
            }

            // The middle level  must be a repeated group with a key field for map keys
            // and, optionally, a value field for map values. But we can't enforce two
            // strict condition here
            // the schema generated by Parquet lib doesn't contain LogicalType
            // ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
            GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
            if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
                || mapKeyValType.getFieldCount() != 2) {
              throw new UnsupportedOperationException(
                  "The middle level of Map should be single field named key_value. Invalid map type "
                      + parquetGroupType);
            }

            Type keyType = mapKeyValType.getType(0);

            // The key field encodes the map's key type. This field must have repetition
            // required and
            // must always be present.
            if (!keyType.isPrimitive()
                || !keyType.isRepetition(Type.Repetition.REQUIRED)
                || !keyType.asPrimitiveType()
                .getPrimitiveTypeName()
                .equals(PrimitiveType.PrimitiveTypeName.BINARY)
                || !keyType.getOriginalType().equals(OriginalType.UTF8)) {
              throw new IllegalArgumentException(
                  "Map key type must be required binary (UTF8): " + keyType);
            }

            Type valueType = mapKeyValType.getType(1);
            return new MapTypeInfo<>(
                BasicTypeInfo.STRING_TYPE_INFO,
                convertParquetTypeToTypeInfo(valueType));
          default:
            throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
        }
      } else {
        // if no original type than it is a record
        return convertFields(parquetGroupType.getFields());
      }
    }

    return typeInfo;
  }

  private static TypeInformation<?> convertParquetPrimitiveListToFlinkArray(Type type) {
    // Backward-compatibility element group doesn't exist also allowed
    TypeInformation<?> flinkType = convertParquetTypeToTypeInfo(type);
    if (flinkType.isBasicType()) {
      return BasicArrayTypeInfo.getInfoFor(
          Array.newInstance(flinkType.getTypeClass(), 0).getClass());
    } else {
      // flinkType here can be either SqlTimeTypeInfo or BasicTypeInfo.BIG_DEC_TYPE_INFO,
      // So it should be converted to ObjectArrayTypeInfo
      return ObjectArrayTypeInfo.getInfoFor(flinkType);
    }
  }

  private static Type convertField(
      String fieldName,
      TypeInformation<?> typeInfo,
      Type.Repetition inheritRepetition,
      boolean legacyMode) {
    Type fieldType = null;

    Type.Repetition repetition =
        inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
    if (typeInfo instanceof BasicTypeInfo) {
      BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
      if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)
          || basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.DECIMAL)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_32)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
                .as(OriginalType.INT_64)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_16)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_8)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.UTF8)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
          || basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.UTF8)
                .named(fieldName);
      }
    } else if (typeInfo instanceof MapTypeInfo) {
      MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;

      if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
        fieldType =
            Types.map(repetition)
                .value(
                    convertField(
                        MAP_VALUE,
                        mapTypeInfo.getValueTypeInfo(),
                        Type.Repetition.OPTIONAL,
                        legacyMode))
                .named(fieldName);
      } else {
        throw new UnsupportedOperationException(
            String.format(
                "Can not convert Flink MapTypeInfo %s to Parquet"
                    + " Map type as key has to be String",
                typeInfo));
      }
    } else if (typeInfo instanceof ObjectArrayTypeInfo) {
      ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;

      // Get all required sub fields
      GroupType componentGroup =
          (GroupType)
              convertField(
                  LIST_ELEMENT,
                  objectArrayTypeInfo.getComponentInfo(),
                  Type.Repetition.REQUIRED,
                  legacyMode);

      GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
      elementGroup = elementGroup.withNewFields(componentGroup.getFields());
      fieldType =
          Types.buildGroup(repetition)
              .addField(elementGroup)
              .as(OriginalType.LIST)
              .named(fieldName);
    } else if (typeInfo instanceof BasicArrayTypeInfo) {
      BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;

      if (legacyMode) {

        // Add extra layer of Group according to Parquet's standard
        Type listGroup =
            Types.repeatedGroup()
                .addField(
                    convertField(
                        LIST_ELEMENT,
                        basicArrayType.getComponentInfo(),
                        Type.Repetition.REQUIRED,
                        legacyMode))
                .named(LIST_GROUP_NAME);

        fieldType =
            Types.buildGroup(repetition)
                .addField(listGroup)
                .as(OriginalType.LIST)
                .named(fieldName);
      } else {
        PrimitiveType primitiveTyp =
            convertField(
                fieldName,
                basicArrayType.getComponentInfo(),
                Type.Repetition.REQUIRED,
                legacyMode)
                .asPrimitiveType();
        fieldType =
            Types.buildGroup(repetition)
                .repeated(primitiveTyp.getPrimitiveTypeName())
                .as(primitiveTyp.getOriginalType())
                .named(LIST_ARRAY_TYPE)
                .as(OriginalType.LIST)
                .named(fieldName);
      }
    } else if (typeInfo instanceof SqlTimeTypeInfo) {
      if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.DATE)
                .named(fieldName);
      } else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.TIME_MILLIS)
                .named(fieldName);
      } else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
                .as(OriginalType.TIMESTAMP_MILLIS)
                .named(fieldName);
      } else {
        throw new UnsupportedOperationException(
            "Unsupported SqlTimeTypeInfo " + typeInfo.toString());
      }

    } else {
      RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
      List<Type> types = new ArrayList<>();
      String[] fieldNames = rowTypeInfo.getFieldNames();
      TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
      for (int i = 0; i < rowTypeInfo.getArity(); i++) {
        types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode));
      }

      if (fieldName == null) {
        fieldType = new MessageType(MESSAGE_ROOT, types);
      } else {
        fieldType = new GroupType(repetition, fieldName, types);
      }
    }

    return fieldType;
  }

  public static MessageType convertToParquetMessageType(String name, RowType rowType) {
    Type[] types = new Type[rowType.getFieldCount()];
    for (int i = 0; i < rowType.getFieldCount(); i++) {
      String fieldName = rowType.getFieldNames().get(i);
      LogicalType fieldType = rowType.getTypeAt(i);
      types[i] = convertToParquetType(fieldName, fieldType, fieldType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED);
    }
    return new MessageType(name, types);
  }

  private static Type convertToParquetType(
      String name, LogicalType type, Type.Repetition repetition) {
    switch (type.getTypeRoot()) {
      case CHAR:
      case VARCHAR:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
            .as(OriginalType.UTF8)
            .named(name);
      case BOOLEAN:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
            .named(name);
      case BINARY:
      case VARBINARY:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
            .named(name);
      case DECIMAL:
        int precision = ((DecimalType) type).getPrecision();
        int scale = ((DecimalType) type).getScale();
        int numBytes = computeMinBytesForDecimalPrecision(precision);
        return Types.primitive(
                PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
            .as(LogicalTypeAnnotation.decimalType(scale, precision))
            .length(numBytes)
            .named(name);
      case TINYINT:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
            .as(LogicalTypeAnnotation.intType(8, true))
            .named(name);
      case SMALLINT:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
            .as(LogicalTypeAnnotation.intType(16, true))
            .named(name);
      case INTEGER:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
            .named(name);
      case BIGINT:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
            .named(name);
      case FLOAT:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
            .named(name);
      case DOUBLE:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
            .named(name);
      case DATE:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
            .as(LogicalTypeAnnotation.dateType())
            .named(name);
      case TIME_WITHOUT_TIME_ZONE:
        return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
            .as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS))
            .named(name);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        TimestampType timestampType = (TimestampType) type;
        if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) {
          TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
          return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
              .as(LogicalTypeAnnotation.timestampType(true, timeunit))
              .named(name);
        } else {
          return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
              .named(name);
        }
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
        if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) {
          TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
          return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
              .as(LogicalTypeAnnotation.timestampType(false, timeunit))
              .named(name);
        } else {
          return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
              .named(name);
        }
      case ARRAY:
        // <list-repetition> group <name> (LIST) {
        //   repeated group list {
        //     <element-repetition> <element-type> element;
        //   }
        // }
        ArrayType arrayType = (ArrayType) type;
        LogicalType elementType = arrayType.getElementType();
        return Types
            .buildGroup(repetition).as(OriginalType.LIST)
            .addField(
                Types.repeatedGroup()
                    .addField(convertToParquetType("element", elementType, repetition))
                    .named("list"))
            .named(name);
      case MAP:
        // <map-repetition> group <name> (MAP) {
        //   repeated group key_value {
        //     required <key-type> key;
        //     <value-repetition> <value-type> value;
        //   }
        // }
        MapType mapType = (MapType) type;
        LogicalType keyType = mapType.getKeyType();
        LogicalType valueType = mapType.getValueType();
        return Types
            .buildGroup(repetition).as(OriginalType.MAP)
            .addField(
                Types
                    .repeatedGroup()
                    .addField(convertToParquetType("key", keyType, Type.Repetition.REQUIRED))
                    .addField(convertToParquetType("value", valueType, repetition))
                    .named("key_value"))
            .named(name);
      case ROW:
        RowType rowType = (RowType) type;
        Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
        rowType.getFields().forEach(field -> builder.addField(convertToParquetType(field.getName(), field.getType(), repetition)));
        return builder.named(name);
      default:
        throw new UnsupportedOperationException("Unsupported type: " + type);
    }
  }

  public static int computeMinBytesForDecimalPrecision(int precision) {
    int numBytes = 1;
    while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
      numBytes += 1;
    }
    return numBytes;
  }
}
